In this tutorial we solve the optimal control problem
$$\min J(y, u) = \frac{1}{2} \int_{\Gamma_{obs}} (v - v_d)^2 dx + \frac{\alpha_1}{2} \int_{\Gamma_C} |\nabla_{\mathbf{t}} u|^2 ds + \frac{\alpha_2}{2} \int_{\Gamma_C} |u|^2 ds$$ s.t. $$\begin{cases} - \nu \Delta v + \nabla p = f & \text{in } \Omega\\ \text{div} v = 0 & \text{in } \Omega\\ v = g & \text{on } \Gamma_{in}\\ v = 0 & \text{on } \Gamma_{w}\\ p n - \nu \partial_n v = u & \text{on } \Gamma_{C} \end{cases}$$
where $$\begin{align*} & \Omega & \text{unit square}\\ & \Gamma_{in} & \text{has boundary id 1}\\ & \Gamma_{w} & \text{has boundary id 2}\\ & \Gamma_{C} & \text{has boundary id 3}\\ & \Gamma_{obs} & \text{has interface id 4}\\ & u \in [L^2(\Gamma_C)]^2 & \text{control variable}\\ & v \in [H^1(\Omega)]^2 & \text{state velocity variable}\\ & p \in L^2(\Omega) & \text{state pressure variable}\\ & \alpha_1, \alpha_2 > 0 & \text{penalization parameters}\\ & v_d & \text{desired state}\\ & f & \text{forcing term}\\ & g & \text{inlet profile}\\ \end{align*}$$ using an adjoint formulation solved by a one shot approach.
The test case is from section 5.5 of F. Negri. Reduced basis method for parametrized optimal control problems governed by PDEs. Master thesis, Politecnico di Milano, 2010-2011.
import dolfinx.fem
import dolfinx.io
import dolfinx.mesh
import gmsh
import mpi4py.MPI
import numpy as np
import numpy.typing as npt
import packaging.version
import petsc4py.PETSc
import ufl
import viskex
import multiphenicsx.fem
import multiphenicsx.fem.petsc
mu1 = 1.0
mu2 = np.pi / 5.0
mu3 = np.pi / 6.0
mu4 = 1.0
mu5 = 1.7
mu6 = 2.2
mesh_size = 0.05
Y = 1.0
X = -Y
L = 3.0
B = Y - mu1
H_1 = B + np.tan(mu2) * mu5
H_2 = B - np.tan(mu3) * mu6
L_1 = mu1 * np.cos(mu2) * np.sin(mu2)
L_2 = (B - X) * np.cos(mu3) * np.sin(mu3)
N = mu1 * np.cos(mu2) * np.cos(mu2)
M = - (B - X) * np.cos(mu3) * np.cos(mu3)
gmsh.initialize()
gmsh.model.add("mesh")
p0 = gmsh.model.geo.addPoint(0.0, X, 0.0, mesh_size)
p1 = gmsh.model.geo.addPoint(L - mu4, X, 0.0, mesh_size)
p2 = gmsh.model.geo.addPoint(L, X, 0.0, mesh_size)
p3 = gmsh.model.geo.addPoint(L + mu6 - L_2, H_2 + M, 0.0, mesh_size)
p4 = gmsh.model.geo.addPoint(L + mu6, H_2, 0.0, mesh_size)
p5 = gmsh.model.geo.addPoint(L, B, 0.0, mesh_size)
p6 = gmsh.model.geo.addPoint(L + mu5, H_1, 0.0, mesh_size)
p7 = gmsh.model.geo.addPoint(L + mu5 - L_1, H_1 + N, 0.0, mesh_size)
p8 = gmsh.model.geo.addPoint(L, Y, 0.0, mesh_size)
p9 = gmsh.model.geo.addPoint(L - mu4, Y, 0.0, mesh_size)
p10 = gmsh.model.geo.addPoint(0.0, Y, 0.0, mesh_size)
l0 = gmsh.model.geo.addLine(p0, p1)
l1 = gmsh.model.geo.addLine(p1, p2)
l2 = gmsh.model.geo.addLine(p2, p3)
l3 = gmsh.model.geo.addLine(p3, p4)
l4 = gmsh.model.geo.addLine(p4, p5)
l5 = gmsh.model.geo.addLine(p5, p6)
l6 = gmsh.model.geo.addLine(p6, p7)
l7 = gmsh.model.geo.addLine(p7, p8)
l8 = gmsh.model.geo.addLine(p8, p9)
l9 = gmsh.model.geo.addLine(p9, p10)
l10 = gmsh.model.geo.addLine(p10, p0)
l11 = gmsh.model.geo.addLine(p1, p9)
l12 = gmsh.model.geo.addLine(p2, p5)
l13 = gmsh.model.geo.addLine(p5, p8)
line_loop_rectangle_left = gmsh.model.geo.addCurveLoop([l0, l11, l9, l10])
line_loop_rectangle_right = gmsh.model.geo.addCurveLoop([l1, l12, l13, l8, -l11])
line_loop_bifurcation_top = gmsh.model.geo.addCurveLoop([l5, l6, l7, -l13])
line_loop_bifurcation_bottom = gmsh.model.geo.addCurveLoop([l2, l3, l4, -l12])
rectangle_left = gmsh.model.geo.addPlaneSurface([line_loop_rectangle_left])
rectangle_right = gmsh.model.geo.addPlaneSurface([line_loop_rectangle_right])
bifurcation_top = gmsh.model.geo.addPlaneSurface([line_loop_bifurcation_top])
bifurcation_bottom = gmsh.model.geo.addPlaneSurface([line_loop_bifurcation_bottom])
gmsh.model.geo.synchronize()
gmsh.model.addPhysicalGroup(1, [l10], 1)
gmsh.model.addPhysicalGroup(1, [l0, l1, l2, l4, l5, l7, l8, l9], 2)
gmsh.model.addPhysicalGroup(1, [l3, l6], 3)
gmsh.model.addPhysicalGroup(1, [l11], 4)
gmsh.model.addPhysicalGroup(2, [rectangle_left], 1)
gmsh.model.addPhysicalGroup(2, [rectangle_right], 2)
gmsh.model.addPhysicalGroup(2, [bifurcation_top], 3)
gmsh.model.addPhysicalGroup(2, [bifurcation_bottom], 4)
gmsh.model.mesh.generate(2)
Info : Meshing 1D... Info : [ 0%] Meshing curve 1 (Line) Info : [ 10%] Meshing curve 2 (Line) Info : [ 20%] Meshing curve 3 (Line) Info : [ 30%] Meshing curve 4 (Line) Info : [ 30%] Meshing curve 5 (Line) Info : [ 40%] Meshing curve 6 (Line) Info : [ 50%] Meshing curve 7 (Line) Info : [ 60%] Meshing curve 8 (Line) Info : [ 60%] Meshing curve 9 (Line) Info : [ 70%] Meshing curve 10 (Line) Info : [ 80%] Meshing curve 11 (Line) Info : [ 80%] Meshing curve 12 (Line) Info : [ 90%] Meshing curve 13 (Line) Info : [100%] Meshing curve 14 (Line) Info : Done meshing 1D (Wall 0.00122392s, CPU 0.002048s) Info : Meshing 2D... Info : [ 0%] Meshing surface 1 (Plane, Frontal-Delaunay) Info : [ 30%] Meshing surface 2 (Plane, Frontal-Delaunay) Info : [ 60%] Meshing surface 3 (Plane, Frontal-Delaunay) Info : [ 80%] Meshing surface 4 (Plane, Frontal-Delaunay) Info : Done meshing 2D (Wall 0.0944405s, CPU 0.09461s) Info : 4623 nodes 9335 elements
if packaging.version.Version(dolfinx.__version__) >= packaging.version.Version("0.11.0.dev0"):
partitioner = dolfinx.mesh.create_cell_partitioner( # type: ignore[call-arg, unused-ignore]
dolfinx.mesh.GhostMode.shared_facet, 2)
else:
partitioner = dolfinx.mesh.create_cell_partitioner( # type: ignore[call-arg, unused-ignore]
dolfinx.mesh.GhostMode.shared_facet)
mesh, subdomains, boundaries_and_interfaces, *_ = dolfinx.io.gmsh.model_to_mesh(
gmsh.model, comm=mpi4py.MPI.COMM_WORLD, rank=0, gdim=2, partitioner=partitioner)
gmsh.finalize()
assert subdomains is not None
assert boundaries_and_interfaces is not None
# Create connectivities required by the rest of the code
mesh.topology.create_connectivity(mesh.topology.dim - 1, mesh.topology.dim)
boundaries_1 = boundaries_and_interfaces.indices[boundaries_and_interfaces.values == 1]
boundaries_2 = boundaries_and_interfaces.indices[boundaries_and_interfaces.values == 2]
boundaries_3 = boundaries_and_interfaces.indices[boundaries_and_interfaces.values == 3]
interfaces_4 = boundaries_and_interfaces.indices[boundaries_and_interfaces.values == 4]
boundaries_12 = boundaries_and_interfaces.indices[np.isin(boundaries_and_interfaces.values, (1, 2))]
integration_entities_on_Gamma_obs = dolfinx.fem.compute_integration_domains(
dolfinx.fem.IntegralType.interior_facet, mesh.topology, interfaces_4)
integration_entities_on_Gamma_obs_reshaped = integration_entities_on_Gamma_obs.reshape(-1, 4)
connected_cells_to_Gamma_obs = integration_entities_on_Gamma_obs_reshaped[:, [0, 2]]
subdomain_ordering = (
subdomains.values[connected_cells_to_Gamma_obs[:, 0]] < subdomains.values[connected_cells_to_Gamma_obs[:, 1]])
if len(subdomain_ordering) > 0 and any(subdomain_ordering):
integration_entities_on_Gamma_obs_reshaped[subdomain_ordering] = integration_entities_on_Gamma_obs_reshaped[
subdomain_ordering][:, [2, 3, 0, 1]]
integration_entities_on_Gamma_obs = integration_entities_on_Gamma_obs_reshaped.flatten()
# Define associated measures
ds = ufl.Measure("ds", subdomain_data=boundaries_and_interfaces)
dS = ufl.Measure("dS", domain=mesh, subdomain_data=[(4, np.array(integration_entities_on_Gamma_obs, dtype=np.int32))])
# Normal and tangent
n = ufl.FacetNormal(mesh)
t = ufl.as_vector([n[1], -n[0]])
viskex.dolfinx.plot_mesh(mesh)
2026-02-22 04:50:37.549 ( 1.214s) [ 7F5B311EB140]vtkXOpenGLRenderWindow.:1460 WARN| bad X server connection. DISPLAY=
/dolfinx-env/lib/python3.12/site-packages/viskex/pyvista_plotter.py:196: PyVistaFutureWarning: The default value of `algorithm` for the filter
`UnstructuredGrid.extract_surface` will change in the future. It currently defaults to
`'dataset_surface'`, but will change to `None`. Explicitly set the `algorithm` keyword to
silence this warning.
grid_edges = grid.separate_cells().extract_surface(
viskex.dolfinx.plot_mesh_tags(mesh, boundaries_and_interfaces, "boundaries and interfaces")
Y_velocity = dolfinx.fem.functionspace(mesh, ("Lagrange", 2, (mesh.geometry.dim, )))
Y_pressure = dolfinx.fem.functionspace(mesh, ("Lagrange", 1))
U = dolfinx.fem.functionspace(mesh, ("Lagrange", 2, (mesh.geometry.dim, )))
Q_velocity = Y_velocity.clone()
Q_pressure = Y_pressure.clone()
dofs_Y_velocity = np.arange(0, Y_velocity.dofmap.index_map.size_local + Y_velocity.dofmap.index_map.num_ghosts)
dofs_Y_pressure = np.arange(0, Y_pressure.dofmap.index_map.size_local + Y_pressure.dofmap.index_map.num_ghosts)
dofs_U = dolfinx.fem.locate_dofs_topological(U, boundaries_and_interfaces.dim, boundaries_3)
dofs_Q_velocity = dofs_Y_velocity
dofs_Q_pressure = dofs_Y_pressure
restriction_Y_velocity = multiphenicsx.fem.DofMapRestriction(Y_velocity.dofmap, dofs_Y_velocity)
restriction_Y_pressure = multiphenicsx.fem.DofMapRestriction(Y_pressure.dofmap, dofs_Y_pressure)
restriction_U = multiphenicsx.fem.DofMapRestriction(U.dofmap, dofs_U)
restriction_Q_velocity = multiphenicsx.fem.DofMapRestriction(Q_velocity.dofmap, dofs_Q_velocity)
restriction_Q_pressure = multiphenicsx.fem.DofMapRestriction(Q_pressure.dofmap, dofs_Q_pressure)
restriction = [
restriction_Y_velocity, restriction_Y_pressure, restriction_U, restriction_Q_velocity, restriction_Q_pressure]
(v, p) = (ufl.TrialFunction(Y_velocity), ufl.TrialFunction(Y_pressure))
(w, q) = (ufl.TestFunction(Y_velocity), ufl.TestFunction(Y_pressure))
u = ufl.TrialFunction(U)
r = ufl.TestFunction(U)
(z, b) = (ufl.TrialFunction(Q_velocity), ufl.TrialFunction(Q_pressure))
(s, d) = (ufl.TestFunction(Q_velocity), ufl.TestFunction(Q_pressure))
nu = 0.04
alpha_1 = 0.001
alpha_2 = 0.1 * alpha_1
x = ufl.SpatialCoordinate(mesh)
c = 0.8
v_d = ufl.as_vector((
(c * 10.0 * (x[1]**3 - x[1]**2 - x[1] + 1.0)) + ((1.0 - c) * 10.0 * (-x[1]**3 - x[1]**2 + x[1] + 1.0)),
0.0))
zero_scalar = petsc4py.PETSc.ScalarType(0) # type: ignore[attr-defined]
zero_vector = np.zeros((2, ), dtype=petsc4py.PETSc.ScalarType) # type: ignore[attr-defined]
ff = dolfinx.fem.Constant(mesh, zero_vector)
def g_eval(x: npt.NDArray[np.float64]) -> npt.NDArray[ # type: ignore[name-defined]
petsc4py.PETSc.ScalarType]:
"""Return the parabolic velocity profile at the inlet."""
values = np.zeros((2, x.shape[1]))
values[0, :] = 10.0 * (x[1, :] + 1.0) * (1.0 - x[1, :])
return values
g = dolfinx.fem.Function(Y_velocity)
g.interpolate(g_eval)
bc0 = dolfinx.fem.Function(Y_velocity)
def tracking(v: ufl.Argument, w: ufl.Argument) -> ufl.core.expr.Expr: # type: ignore[no-any-unimported]
"""Return the UFL expression corresponding to the tracking term."""
return ufl.inner(v, w)("-")
def penalty(u: ufl.Argument, r: ufl.Argument) -> ufl.core.expr.Expr: # type: ignore[no-any-unimported]
"""Return the UFL expression corresponding to the penalty term."""
return alpha_1 * ufl.inner(ufl.grad(u) * t, ufl.grad(r) * t) + alpha_2 * ufl.inner(u, r)
a = [[tracking(v, w) * dS(4), None, None, nu * ufl.inner(ufl.grad(z), ufl.grad(w)) * ufl.dx,
- ufl.inner(b, ufl.div(w)) * ufl.dx],
[None, None, None, - ufl.inner(ufl.div(z), q) * ufl.dx, None],
[None, None, penalty(u, r) * ds(3), - ufl.inner(z, r) * ds(3), None],
[nu * ufl.inner(ufl.grad(v), ufl.grad(s)) * ufl.dx, - ufl.inner(p, ufl.div(s)) * ufl.dx,
- ufl.inner(u, s) * ds(3), None, None],
[- ufl.inner(ufl.div(v), d) * ufl.dx, None, None, None, None]]
f = [tracking(v_d, w) * dS(4),
None,
None,
ufl.inner(ff, s) * ufl.dx,
None]
a[0][0] += dolfinx.fem.Constant(mesh, zero_scalar) * ufl.inner(v, w) * (ds(1) + ds(2))
a[3][3] = dolfinx.fem.Constant(mesh, zero_scalar) * ufl.inner(z, s) * (ds(1) + ds(2))
f[1] = ufl.inner(dolfinx.fem.Constant(mesh, zero_scalar), q) * ufl.dx
f[2] = ufl.inner(dolfinx.fem.Constant(mesh, zero_vector), r) * ufl.dx
f[4] = ufl.inner(dolfinx.fem.Constant(mesh, zero_scalar), d) * ufl.dx
bdofs_Y_velocity_1 = dolfinx.fem.locate_dofs_topological(
(Y_velocity, Y_velocity), mesh.topology.dim - 1, boundaries_1)
bdofs_Y_velocity_2 = dolfinx.fem.locate_dofs_topological(
(Y_velocity, Y_velocity), mesh.topology.dim - 1, boundaries_2)
bdofs_Q_velocity_12 = dolfinx.fem.locate_dofs_topological(
(Q_velocity, Y_velocity), mesh.topology.dim - 1, boundaries_12)
bc = [dolfinx.fem.dirichletbc(g, bdofs_Y_velocity_1, Y_velocity),
dolfinx.fem.dirichletbc(bc0, bdofs_Y_velocity_2, Y_velocity),
dolfinx.fem.dirichletbc(bc0, bdofs_Q_velocity_12, Q_velocity)]
(v, p) = (dolfinx.fem.Function(Y_velocity), dolfinx.fem.Function(Y_pressure))
u = dolfinx.fem.Function(U)
(z, b) = (dolfinx.fem.Function(Q_velocity), dolfinx.fem.Function(Q_pressure))
J = 0.5 * tracking(v - v_d, v - v_d) * dS(4) + 0.5 * penalty(u, u) * ds(3)
J_cpp = dolfinx.fem.form(J)
# Extract state forms from the optimality conditions
a_state = [[ufl.replace(a[i][j], {s: w, d: q}) if a[i][j] is not None else None for j in (0, 1)] for i in (3, 4)]
f_state = [ufl.replace(f[i], {s: w, d: q}) for i in (3, 4)]
bc_state = [bc[0], bc[1]]
restriction_state = [restriction[i] for i in (3, 4)]
petsc_options = {
"ksp_type": "preonly",
"pc_type": "lu",
"pc_factor_mat_solver_type": "mumps",
"ksp_error_if_not_converged": True,
}
problem_state = multiphenicsx.fem.petsc.LinearProblem(
a_state, f_state, bcs=bc_state, u=(v, p),
petsc_options_prefix="tutorial_7b_stokes_neumann_control_state_", petsc_options=petsc_options,
kind="mpi", restriction=restriction_state
)
problem_state.solve()
del problem_state
J_uncontrolled = mesh.comm.allreduce(dolfinx.fem.assemble_scalar(J_cpp), op=mpi4py.MPI.SUM)
print("Uncontrolled J =", J_uncontrolled)
assert np.isclose(J_uncontrolled, 2.8479865)
Uncontrolled J = 2.8479942831821674
viskex.dolfinx.plot_vector_field(v, "uncontrolled state velocity", glyph_factor=1e-2)
viskex.dolfinx.plot_scalar_field(p, "uncontrolled state pressure")
/dolfinx-env/lib/python3.12/site-packages/viskex/pyvista_plotter.py:196: PyVistaFutureWarning: The default value of `algorithm` for the filter `UnstructuredGrid.extract_surface` will change in the future. It currently defaults to `'dataset_surface'`, but will change to `None`. Explicitly set the `algorithm` keyword to silence this warning. grid_edges = grid.separate_cells().extract_surface(
problem = multiphenicsx.fem.petsc.LinearProblem(
a, f, bcs=bc, u=(v, p, u, z, b),
petsc_options_prefix="tutorial_7b_stokes_neumann_control_", petsc_options=petsc_options,
kind="mpi", restriction=restriction
)
problem.solve()
del problem
J_controlled = mesh.comm.allreduce(dolfinx.fem.assemble_scalar(J_cpp), op=mpi4py.MPI.SUM)
print("Optimal J =", J_controlled)
assert np.isclose(J_controlled, 1.7643950)
Optimal J = 1.7643940719679696
viskex.dolfinx.plot_vector_field(v, "state velocity", glyph_factor=1e-2)
viskex.dolfinx.plot_scalar_field(p, "state pressure")
/dolfinx-env/lib/python3.12/site-packages/viskex/pyvista_plotter.py:196: PyVistaFutureWarning: The default value of `algorithm` for the filter `UnstructuredGrid.extract_surface` will change in the future. It currently defaults to `'dataset_surface'`, but will change to `None`. Explicitly set the `algorithm` keyword to silence this warning. grid_edges = grid.separate_cells().extract_surface(
viskex.dolfinx.plot_vector_field(u, "control", glyph_factor=1e-1)
viskex.dolfinx.plot_vector_field(z, "adjoint velocity", glyph_factor=1e-1)
viskex.dolfinx.plot_scalar_field(b, "adjoint pressure")
/dolfinx-env/lib/python3.12/site-packages/viskex/pyvista_plotter.py:196: PyVistaFutureWarning: The default value of `algorithm` for the filter `UnstructuredGrid.extract_surface` will change in the future. It currently defaults to `'dataset_surface'`, but will change to `None`. Explicitly set the `algorithm` keyword to silence this warning. grid_edges = grid.separate_cells().extract_surface(